--- title: Sensors keywords: fastai sidebar: home_sidebar summary: "Reads and packages auxillary sensor (IMU/GPS) data over UART" description: "Reads and packages auxillary sensor (IMU/GPS) data over UART" nb_path: "nbs/07_sensors.ipynb" ---
{% raw %}
{% endraw %}

{% include tip.html content='This module can be imported using from openhsi.sensors import *' %}{% include warning.html content='Still experimental. Stay tuned.' %}

{% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}
{% endraw %}

Sensor data packet read and decode pyserial code below. Modified from a past project of mine.

Todo: Read the RTC timestamp at beginning of collection. The data packets only contain timestamp offsets.

{% raw %}

read_packet[source]

read_packet(header:chr=`''*, **num_bytes**:int=*80*, **timeout**:float=*6.0`*)

Reads `num_bytes` of a data packet starting with `header` and timing out after `timeout` seconds if packet is invalid.
{% endraw %} {% raw %}

decode_packet[source]

decode_packet(buff:byte string=None)

Decode `buff` into a list of decoded variables
{% endraw %} {% raw %}
{% endraw %} {% raw %}
while True:
    GPIO.setmode(GPIO.BCM) # BCM pin-numbering scheme from Raspberry Pi
    GPIO.setup(go_but, GPIO.IN)
    if GPIO.input(go_but) == True:
        #bounce
        record()
    
    else:
        to_DF()
        #save with time delta
        
        remove event
        
        
    
{% endraw %} {% raw %}
while True:
    GPIO.setmode(GPIO.BCM) # BCM pin-numbering scheme from Raspberry Pi
    GPIO.setup(go_but, GPIO.IN)
    if GPIO.input(go_but) == True:
        
        camera ready
        mount 
        p = Process(GPS?)
        collect
        save
    else:
        sleep
        p.join
        "sudo unmount /dev/..."
        os.system()
        
{% endraw %}

Streaming dashboard

View the XBee status on a dashboard that shows the last 100 points.

{% raw %}
import serial
import sched, time
import pandas as pd
import numpy as np
import param
import panel as pn
import hvplot.pandas
import hvplot.streamz
import holoviews as hv
from holoviews.element.tiles import EsriImagery
from holoviews.selection import link_selections
from datashader.utils import lnglat_to_meters
from streamz.dataframe import PeriodicDataFrame
hv.extension('bokeh',logo=False)
from holoviews.streams import Pipe, Buffer

class SensorStream():

    def __init__(self, baudrate=115_200, port="/dev/cu.usbserial-DN05TVTD",buff_len:int = 100):
        
        self.ser = serial.Serial(port=port,
                                baudrate=baudrate,
                                bytesize=serial.EIGHTBITS,
                                parity=serial.PARITY_NONE,
                                stopbits=serial.STOPBITS_ONE,)
        
        # Instantiate for storing data
        self.data = []
        self.data_df = pd.DataFrame(None, columns=["lat","lon","sats","temp","pressure","humidity","sys_cal","gyro_cal","accel_cal","mag_cal"])
        
        self.loc_stream = Buffer( pd.DataFrame({"lon":[],"lat":[]}), length=buff_len, index=False)
        #temp = pd.DataFrame({"lon":[151.1,151.2],"lat":[-33.8,-34]})
        #self.loc_stream.send( pd.concat(lnglat_to_meters(temp["lon"],temp["lat"]),axis=1))
        self.loc_dmap = hv.DynamicMap(hv.Scatter,streams=[self.loc_stream]).opts(xlabel="latitude",ylabel="longitude")
        self.map_tiles  = EsriImagery().opts(alpha=0.5,bgcolor='white')
        
        self.sat_stream = Buffer( pd.DataFrame({"sats":[]}), length=buff_len)
        self.sat_dmap   = hv.DynamicMap(hv.Curve,streams=[self.sat_stream]).opts(xlabel="index",ylabel="number of satellites in view")
        
        self.temp_stream = Buffer( pd.DataFrame({"temp":[]}), length=buff_len)
        self.temp_dmap = hv.DynamicMap(hv.Curve,streams=[self.temp_stream]).opts(xlabel="index",ylabel="temperature (deg C)",toolbar=None)
        
        self.pressure_stream = Buffer( pd.DataFrame({"pressure":[]}), length=buff_len)
        self.pressure_dmap   = hv.DynamicMap(hv.Curve,streams=[self.pressure_stream]).opts(xlabel="index",ylabel="pressure (hPa)",toolbar=None)
        
        self.humidity_stream = Buffer( pd.DataFrame({"humidity":[]}), length=buff_len)
        self.humidity_dmap   = hv.DynamicMap(hv.Curve,streams=[self.humidity_stream]).opts(xlabel="index",ylabel="relative humidity %",toolbar=None)
        
        self.sys_stream   = Buffer( pd.DataFrame({"sys_cal":[]}),length=buff_len)
        self.sys_dmap     = hv.DynamicMap(hv.Curve,streams=[self.sys_stream],label="system")
        self.gyro_stream  = Buffer( pd.DataFrame({"gyro_cal":[]}),length=buff_len)
        self.gyro_dmap    = hv.DynamicMap(hv.Curve,streams=[self.gyro_stream],label="gyro")
        self.accel_stream = Buffer( pd.DataFrame({"accel_cal":[]}),length=buff_len)
        self.accel_dmap   = hv.DynamicMap(hv.Curve,streams=[self.accel_stream],label="accel")
        self.mag_stream   = Buffer( pd.DataFrame({"mag_cal":[]}),length=buff_len)
        self.mag_dmap     = hv.DynamicMap(hv.Curve,streams=[self.mag_stream],label="mag")
        
        self.clear_btn = pn.widgets.Button(name='Clear', button_type='primary')
        self.msg = pn.widgets.StaticText(name="")
        self.clear_btn.on_click(self.clear_all)
        self.startstop_btn = pn.widgets.Button(name="Start",button_type="success")
        self.check_run = False
        self.startstop_btn.on_click(self.startstop)
        self.counter = 0
        
    def run(self):
        while True:
            time.sleep(1)
            if self.check_run:
                self.read()
                self.update()
                self.counter += 1
                self.msg.value = f"Running. Count = {self.counter}"
    
    def startstop(self,event):
        self.check_run = not self.check_run
        self.startstop_btn.name = "Pause" if self.startstop_btn.name == "Start" else "Start"
        self.startstop_btn.button_type = "danger" if self.startstop_btn.button_type == "success" else "success"
        self.msg.value = "Paused" if self.startstop_btn.name == "Start" else "Running"
        if self.startstop_btn.name == "Start": self.counter = 0
    
    def __call__(self):
        self.imu_stats = (self.sys_dmap*self.gyro_dmap*self.accel_dmap*self.mag_dmap).opts(ylabel="IMU calibration status",legend_position="left")
        return pn.Column(pn.Row(self.loc_dmap.opts(hv.opts.Scatter(size=10))*self.map_tiles,self.sat_dmap,self.imu_stats),
                         pn.Row(self.temp_dmap,self.pressure_dmap,self.humidity_dmap),
                         pn.Row(pn.Column(self.clear_btn,self.startstop_btn),self.msg)).servable()
    def close(self):
        self.ser.close()
    
    def clear_all(self,event):
        self.loc_stream.clear()
        self.sat_stream.clear()
        self.temp_stream.clear()
        self.pressure_stream.clear()
        self.humidity_stream.clear()
        self.sys_stream.clear()
        self.gyro_stream.clear()
        self.accel_stream.clear()
        self.mag_stream.clear()
        self.msg.value = f"Cleared {self.clear_btn.clicks} time(s)"
        
        
    def update(self):
        #self.data_df = pd.DataFrame({"lon":[151.1,151.2],"lat":[-33.8,-34],"temp":[20,30],"sys":[0,3],"gyro":[0,0]})
        self.loc_stream.send( pd.concat(lnglat_to_meters(self.data_df['lon'], self.data_df['lat']),axis=1))
        self.sat_stream.send( self.data_df["sats"].to_frame() )
        self.temp_stream.send( self.data_df["temp"].to_frame() )
        self.pressure_stream.send( self.data_df["pressure"].to_frame() )
        self.humidity_stream.send( self.data_df["humidity"].to_frame() )
        self.sys_stream.send( self.data_df["sys_cal"].to_frame() )
        self.gyro_stream.send( self.data_df["gyro_cal"].to_frame() )
        self.accel_stream.send( self.data_df["accel_cal"].to_frame() )
        self.mag_stream.send( self.data_df["mag_cal"].to_frame() )
        
    def read(self,timeout:float=2):
        start_time = time.time()

        # Check if line is ready
        while self.ser.inWaiting() > 0:
            
            # Read line from serial
            self.line_data = self.ser.readline()
            if ord('*') not in self.line_data or (len(self.line_data) < 23) : continue # packet not complete, skip
            
            if time.time()-start_time > timeout:
                print("timeout")
                self.ser.flushInput()
                break
            
            contents = []
            np_buff = np.frombuffer(self.line_data,dtype='int8').astype(np.uint8) 
            
            contents.append( float(np_buff[1:5].view(np.int32))*1e-7 ) # latitude [deg]
            contents.append( float(np_buff[5:9].view(np.int32))*1e-7 ) # longitude [deg]
            contents[0] += np.random.rand()
            contents[1] += np.random.rand()
            contents.append( *np_buff[9:10].view(np.uint8)    ) # number of satellites in view and used in compute
            contents.append( *np_buff[10:14].view(np.float32) ) # temperature [deg C]
            contents.append( *np_buff[14:18].view(np.float32) ) # pressure [hPa]
            contents.append( *np_buff[18:22].view(np.float32) ) # humidity [relative humidity %]
            cal_char = np_buff[22].view(np.uint8)
            contents.append( (cal_char & 0b1100_0000) >> 6) # system calibration
            contents.append( (cal_char & 0b0011_0000) >> 4) # gyro calibration
            contents.append( (cal_char & 0b0000_1100) >> 2) # accel calibration
            contents.append( cal_char & 0b0000_0011       ) # mag calibration
            self.data.append(contents)
        self.data_df = pd.DataFrame(self.data, columns=["lat","lon","sats","temp","pressure","humidity","sys_cal","gyro_cal","accel_cal","mag_cal"])

    
{% endraw %} {% raw %}
ss = SensorStream()
ss()
{% endraw %}

{% include warning.html content='Start/Pause button not working!' %}

{% raw %}
ss.run()
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-28-5f954822c345> in <module>
      1 #hardware
      2 
----> 3 ss.run()

<ipython-input-27-0f5674d17383> in run(self)
     68     def run(self):
     69         while True:
---> 70             time.sleep(1)
     71             if self.check_run:
     72                 self.read()

KeyboardInterrupt: 
{% endraw %} {% raw %}
ss.ser.close()
{% endraw %} {% raw %}
ss.data_df
lat lon sats temp pressure humidity sys_cal gyro_cal accel_cal mag_cal
0 -33.743084 152.002595 6 23.959999 1009.301147 57.174805 0 0 0 0
1 -33.628404 151.289836 6 23.969999 1009.341736 57.151367 0 0 0 0
2 -32.957323 151.410112 6 23.969999 1009.320557 57.151367 0 0 0 0
3 -32.974990 151.678872 6 23.990000 1009.344849 57.945312 0 0 0 0
4 -33.148673 151.656227 6 23.990000 1009.343933 58.179688 0 0 0 0
... ... ... ... ... ... ... ... ... ... ...
603 -32.979386 151.482988 9 24.820000 1009.469055 56.936523 3 3 3 3
604 -33.500686 151.774686 9 24.830000 1009.441711 56.960938 3 3 3 3
605 -33.438636 151.022355 9 24.820000 1009.461975 56.960938 3 3 3 3
606 -32.848062 151.060729 9 24.820000 1009.477783 56.998047 3 3 3 3
607 -33.395096 151.709758 9 24.809999 1009.464233 57.022461 3 3 3 3

608 rows × 10 columns

{% endraw %} {% raw %}
{% endraw %} {% raw %}

class SensorStream[source]

SensorStream(baudrate=921600, port='/dev/ttyTHS0', start_pin=27, save_dir='/xavier_ssd/data/')

{% endraw %} {% raw %}
{% endraw %} {% raw %}
peripherals = SensorStream(baudrate=921_600,port="/dev/ttyTHS0")
peripherals.run() # start on switch on, pause on switch off, KeyboardInterrupt to stop
{% endraw %} {% raw %}
peripherals = SensorStream(baudrate=921_600,port="/dev/ttyTHS0")
peripherals.record(max_timeout=2)
print(peripherals.data)
{% endraw %} {% raw %}
print("hardware")
raise ValueError("Hardware flag entered")
hardware
{% endraw %}